From 3b2b87707072e5dc9221a5ba3c727c70db13a593 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Wed, 10 May 2023 05:09:25 -0700 Subject: [PATCH] perf: Improve Multiple Chunk Upload Performance (#2185) * perf: Improve Multiple Chunk Upload Performance * perf: Keep Buffers in Arrays * doc: `prependLocalBufferToUpstream` * refactor: Remove `bufferEncoding` * perf: Request Data From Upstream Immediately Upon Consuming --- src/file.ts | 2 + src/resumable-upload.ts | 197 ++++++++++++++++++------------ test/resumable-upload.ts | 251 ++++++++++++++++++++++++--------------- 3 files changed, 284 insertions(+), 166 deletions(-) diff --git a/src/file.ts b/src/file.ts index 3813e32f1..1237fd01d 100644 --- a/src/file.ts +++ b/src/file.ts @@ -199,6 +199,7 @@ export type PredefinedAcl = export interface CreateResumableUploadOptions { chunkSize?: number; + highWaterMark?: number; metadata?: Metadata; origin?: string; offset?: number; @@ -3853,6 +3854,7 @@ class File extends ServiceObject { retryOptions: {...retryOptions}, params: options?.preconditionOpts || this.instancePreconditionOpts, chunkSize: options?.chunkSize, + highWaterMark: options?.highWaterMark, }); uploadStream diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 647d37d8a..da90bdca1 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -23,7 +23,7 @@ import { } from 'gaxios'; import * as gaxios from 'gaxios'; import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library'; -import {Readable, Writable} from 'stream'; +import {Readable, Writable, WritableOptions} from 'stream'; import retry = require('async-retry'); import {RetryOptions, PreconditionOptions} from './storage'; import * as uuid from 'uuid'; @@ -62,7 +62,7 @@ export interface QueryParameters extends PreconditionOptions { userProject?: string; } -export interface UploadConfig { +export interface UploadConfig extends Pick { /** * The API endpoint used for the request. * Defaults to `storage.googleapis.com`. @@ -260,20 +260,22 @@ export class Upload extends Writable { uri: uuid.v4(), offset: uuid.v4(), }; - private upstreamChunkBuffer: Buffer = Buffer.alloc(0); - private chunkBufferEncoding?: BufferEncoding = undefined; + /** + * A cache of buffers written to this instance, ready for consuming + */ + private writeBuffers: Buffer[] = []; private numChunksReadInRequest = 0; /** - * A chunk used for caching the most recent upload chunk. + * An array of buffers used for caching the most recent upload chunk. * We should not assume that the server received all bytes sent in the request. * - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload */ - private lastChunkSent = Buffer.alloc(0); + private localWriteCache: Buffer[] = []; + private localWriteCacheByteLength = 0; private upstreamEnded = false; constructor(cfg: UploadConfig) { - super(); - + super(cfg); cfg = cfg || {}; if (!cfg.bucket || !cfg.file) { @@ -391,24 +393,73 @@ export class Upload extends Writable { // Backwards-compatible event this.emit('writing'); - this.upstreamChunkBuffer = Buffer.concat([ - this.upstreamChunkBuffer, - typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk, - ]); - this.chunkBufferEncoding = encoding; + this.writeBuffers.push( + typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk + ); this.once('readFromChunkBuffer', readCallback); process.nextTick(() => this.emit('wroteToChunkBuffer')); } + #resetLocalBuffersCache() { + this.localWriteCache = []; + this.localWriteCacheByteLength = 0; + } + + #addLocalBufferCache(buf: Buffer) { + this.localWriteCache.push(buf); + this.localWriteCacheByteLength += buf.byteLength; + } + /** - * Prepends data back to the upstream chunk buffer. + * Prepends the local buffer to write buffer and resets it. * - * @param chunk The data to prepend + * @param keepLastBytes number of bytes to keep from the end of the local buffer. */ - private unshiftChunkBuffer(chunk: Buffer) { - this.upstreamChunkBuffer = Buffer.concat([chunk, this.upstreamChunkBuffer]); + private prependLocalBufferToUpstream(keepLastBytes?: number) { + // Typically, the upstream write buffers should be smaller than the local + // cache, so we can save time by setting the local cache as the new + // upstream write buffer array and appending the old array to it + let initialBuffers: Buffer[] = []; + + if (keepLastBytes) { + // we only want the last X bytes + let bytesKept = 0; + + while (keepLastBytes > bytesKept) { + // load backwards because we want the last X bytes + // note: `localWriteCacheByteLength` is reset below + let buf = this.localWriteCache.pop(); + if (!buf) break; + + bytesKept += buf.byteLength; + + if (bytesKept > keepLastBytes) { + // we have gone over the amount desired, let's keep the last X bytes + // of this buffer + const diff = bytesKept - keepLastBytes; + buf = buf.subarray(diff); + bytesKept -= diff; + } + + initialBuffers.unshift(buf); + } + } else { + // we're keeping all of the local cache, simply use it as the initial buffer + initialBuffers = this.localWriteCache; + } + + // Append the old upstream to the new + const append = this.writeBuffers; + this.writeBuffers = initialBuffers; + + for (const buf of append) { + this.writeBuffers.push(buf); + } + + // reset last buffers sent + this.#resetLocalBuffersCache(); } /** @@ -417,15 +468,28 @@ export class Upload extends Writable { * @param limit The maximum amount to return from the buffer. * @returns The data requested. */ - private pullFromChunkBuffer(limit: number) { - const chunk = this.upstreamChunkBuffer.slice(0, limit); - this.upstreamChunkBuffer = this.upstreamChunkBuffer.slice(limit); + private *pullFromChunkBuffer(limit: number) { + while (limit) { + const buf = this.writeBuffers.shift(); + if (!buf) break; + + let bufToYield = buf; - // notify upstream we've read from the buffer so it can potentially - // send more data down. - process.nextTick(() => this.emit('readFromChunkBuffer')); + if (buf.byteLength > limit) { + bufToYield = buf.subarray(0, limit); + this.writeBuffers.unshift(buf.subarray(limit)); + limit = 0; + } else { + limit -= buf.byteLength; + } + + yield bufToYield; - return chunk; + // Notify upstream we've read from the buffer and we're able to consume + // more. It can also potentially send more data down as we're currently + // iterating. + this.emit('readFromChunkBuffer'); + } } /** @@ -436,7 +500,7 @@ export class Upload extends Writable { private async waitForNextChunk(): Promise { const willBeMoreChunks = await new Promise(resolve => { // There's data available - it should be digested - if (this.upstreamChunkBuffer.byteLength) { + if (this.writeBuffers.length) { return resolve(true); } @@ -457,7 +521,7 @@ export class Upload extends Writable { removeListeners(); // this should be the last chunk, if there's anything there - if (this.upstreamChunkBuffer.length) return resolve(true); + if (this.writeBuffers.length) return resolve(true); return resolve(false); }; @@ -483,35 +547,16 @@ export class Upload extends Writable { * Ends when the limit has reached or no data is expected to be pushed from upstream. * * @param limit The most amount of data this iterator should return. `Infinity` by default. - * @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator */ - private async *upstreamIterator(limit = Infinity, oneChunkMode?: boolean) { - let completeChunk = Buffer.alloc(0); - + private async *upstreamIterator(limit = Infinity) { // read from upstream chunk buffer while (limit && (await this.waitForNextChunk())) { // read until end or limit has been reached - const chunk = this.pullFromChunkBuffer(limit); - - limit -= chunk.byteLength; - if (oneChunkMode) { - // return 1 chunk at the end of iteration - completeChunk = Buffer.concat([completeChunk, chunk]); - } else { - // return many chunks throughout iteration - yield { - chunk, - encoding: this.chunkBufferEncoding, - }; + for (const chunk of this.pullFromChunkBuffer(limit)) { + limit -= chunk.byteLength; + yield chunk; } } - - if (oneChunkMode) { - yield { - chunk: completeChunk, - encoding: this.chunkBufferEncoding, - }; - } } createURI(): Promise; @@ -680,10 +725,7 @@ export class Upload extends Writable { } // A queue for the upstream data - const upstreamQueue = this.upstreamIterator( - expectedUploadSize, - multiChunkMode // multi-chunk mode should return 1 chunk per request - ); + const upstreamQueue = this.upstreamIterator(expectedUploadSize); // The primary read stream for this request. This stream retrieves no more // than the exact requested amount from upstream. @@ -696,15 +738,23 @@ export class Upload extends Writable { if (result.value) { this.numChunksReadInRequest++; - this.lastChunkSent = result.value.chunk; - this.numBytesWritten += result.value.chunk.byteLength; + + if (multiChunkMode) { + // save ever buffer used in the request in multi-chunk mode + this.#addLocalBufferCache(result.value); + } else { + this.#resetLocalBuffersCache(); + this.#addLocalBufferCache(result.value); + } + + this.numBytesWritten += result.value.byteLength; this.emit('progress', { bytesWritten: this.numBytesWritten, contentLength: this.contentLength, }); - requestStream.push(result.value.chunk, result.value.encoding); + requestStream.push(result.value); } if (result.done) { @@ -720,17 +770,21 @@ export class Upload extends Writable { // If using multiple chunk upload, set appropriate header if (multiChunkMode) { // We need to know how much data is available upstream to set the `Content-Range` header. - const oneChunkIterator = this.upstreamIterator(expectedUploadSize, true); - const {value} = await oneChunkIterator.next(); + // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + for await (const chunk of this.upstreamIterator(expectedUploadSize)) { + // This will conveniently track and keep the size of the buffers + this.#addLocalBufferCache(chunk); + } - const bytesToUpload = value!.chunk.byteLength; + // We hit either the expected upload size or the remainder + const bytesToUpload = this.localWriteCacheByteLength; // Important: we want to know if the upstream has ended and the queue is empty before // unshifting data back into the queue. This way we will know if this is the last request or not. const isLastChunkOfUpload = !(await this.waitForNextChunk()); - // Important: put the data back in the queue for the actual upload iterator - this.unshiftChunkBuffer(value!.chunk); + // Important: put the data back in the queue for the actual upload + this.prependLocalBufferToUpstream(); let totalObjectSize = this.contentLength; @@ -808,15 +862,14 @@ export class Upload extends Writable { // - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload const missingBytes = this.numBytesWritten - this.offset; if (missingBytes) { - const dataToPrependForResending = this.lastChunkSent.slice( - -missingBytes - ); // As multi-chunk uploads send one chunk per request and pulls one // chunk into the pipeline, prepending the missing bytes back should // be fine for the next request. - this.unshiftChunkBuffer(dataToPrependForResending); + this.prependLocalBufferToUpstream(missingBytes); this.numBytesWritten -= missingBytes; - this.lastChunkSent = Buffer.alloc(0); + } else { + // No bytes missing - no need to keep the local cache + this.#resetLocalBuffersCache(); } // continue uploading next chunk @@ -831,8 +884,8 @@ export class Upload extends Writable { this.destroy(err); } else { - // remove the last chunk sent to free memory - this.lastChunkSent = Buffer.alloc(0); + // no need to keep the cache + this.#resetLocalBuffersCache(); if (resp && resp.data) { resp.data.size = Number(resp.data.size); @@ -983,11 +1036,9 @@ export class Upload extends Writable { return; } - // Unshift the most recent chunk back in case it's needed for the next - // request. - this.numBytesWritten -= this.lastChunkSent.byteLength; - this.unshiftChunkBuffer(this.lastChunkSent); - this.lastChunkSent = Buffer.alloc(0); + // Unshift the local cache back in case it's needed for the next request. + this.numBytesWritten -= this.localWriteCacheByteLength; + this.prependLocalBufferToUpstream(); // We don't know how much data has been received by the server. // `continueUploading` will recheck the offset via `getAndSetOffset`. diff --git a/test/resumable-upload.ts b/test/resumable-upload.ts index 37b93ef71..6e3200d95 100644 --- a/test/resumable-upload.ts +++ b/test/resumable-upload.ts @@ -357,6 +357,26 @@ describe('resumable-upload', () => { assert.strictEqual(up.chunkSize, 123); }); + it('should have a default `writableHighWaterMark`', () => { + const up = upload({ + bucket: BUCKET, + file: FILE, + retryOptions: RETRY_OPTIONS, + }); + + assert(up.writableHighWaterMark); + }); + + it('should accept a `highWaterMark` and set a `writableHighWaterMark`', () => { + const up = upload({ + bucket: BUCKET, + file: FILE, + retryOptions: RETRY_OPTIONS, + highWaterMark: 123, + }); + assert.strictEqual(up.writableHighWaterMark, 123); + }); + describe('on write', () => { let uri = ''; @@ -446,8 +466,7 @@ describe('resumable-upload', () => { it('should handle writes to class', done => { up.on('wroteToChunkBuffer', () => { - assert.equal(up.upstreamChunkBuffer.byteLength, 16); - assert.equal(up.chunkBufferEncoding, 'buffer'); + assert.equal(up.writeBuffers[0].byteLength, 16); done(); }); @@ -495,29 +514,29 @@ describe('resumable-upload', () => { up.createURI = () => {}; }); - it('should append buffer to existing `upstreamChunkBuffer`', () => { - up.upstreamChunkBuffer = Buffer.from('abc'); + it('should append buffer to existing `writeBuffers`', () => { + up.writeBuffers = [Buffer.from('abc')]; up.write(Buffer.from('def')); assert.equal( - Buffer.compare(up.upstreamChunkBuffer, Buffer.from('abcdef')), + Buffer.compare(Buffer.concat(up.writeBuffers), Buffer.from('abcdef')), 0 ); }); - it('should convert string with encoding to Buffer and append to existing `upstreamChunkBuffer`', () => { + it('should convert string with encoding to Buffer and append to existing `writeBuffers`', () => { const existing = 'a '; const sample = '🦃'; const concat = existing + sample; - up.upstreamChunkBuffer = Buffer.from(existing); - assert.equal(up.chunkBufferEncoding, undefined); + up.writeBuffers = [Buffer.from(existing)]; up.write(sample, 'utf-8', () => {}); - assert(Buffer.isBuffer(up.upstreamChunkBuffer)); - assert.equal(up.upstreamChunkBuffer.toString(), concat); - assert.equal(up.chunkBufferEncoding, 'buffer'); + for (const buf of up.writeBuffers) { + assert(Buffer.isBuffer(buf)); + } + assert.equal(Buffer.concat(up.writeBuffers), concat); }); it("should callback on 'readFromChunkBuffer'", done => { @@ -534,61 +553,116 @@ describe('resumable-upload', () => { }); }); - describe('#unshiftChunkBuffer', () => { + describe('#prependLocalBufferToUpstream', () => { it('should synchronously prepend to existing buffer', () => { - up.upstreamChunkBuffer = Buffer.from('456'); + up.localWriteCache = [Buffer.from('123')]; + up.localWriteCacheByteLength = up.localWriteCache[0].byteLength; + up.writeBuffers = [Buffer.from('456')]; - up.unshiftChunkBuffer(Buffer.from('123')); - assert.equal( - Buffer.compare(up.upstreamChunkBuffer, Buffer.from('123456')), - 0 + up.prependLocalBufferToUpstream(); + + assert.equal(up.localWriteCache.length, 0); + // shouldn't concat any buffers, thus writeBuffers.length = 2 + assert.equal(up.writeBuffers.length, 2); + assert.equal(Buffer.concat(up.writeBuffers).toString(), '123456'); + assert.equal(up.localWriteCacheByteLength, 0); + }); + + it('should keep the desired last few bytes', () => { + up.localWriteCache = [Buffer.from('123'), Buffer.from('456')]; + up.localWriteCacheByteLength = up.localWriteCache.reduce( + (a: Buffer, b: number) => a.byteLength + b ); + up.writeBuffers = [Buffer.from('789')]; + + up.prependLocalBufferToUpstream(2); + + assert.equal(up.localWriteCache.length, 0); + // shouldn't concat any buffers, thus writeBuffers.length = 2 + assert.equal(up.writeBuffers.length, 2); + assert.equal(Buffer.concat(up.writeBuffers).toString(), '56789'); + assert.equal(up.localWriteCacheByteLength, 0); }); }); describe('#pullFromChunkBuffer', () => { - it('should retrieve from the beginning of the `upstreamChunkBuffer`', () => { - up.upstreamChunkBuffer = Buffer.from('ab'); + it('should retrieve from the beginning of the `writeBuffers`', () => { + up.writeBuffers = [Buffer.from('ab')]; - const chunk = up.pullFromChunkBuffer(1); + const [chunk] = [...up.pullFromChunkBuffer(1)]; assert.equal(chunk.toString(), 'a'); - assert.equal(up.upstreamChunkBuffer.toString(), 'b'); + assert.equal(up.writeBuffers.length, 1); + assert.equal(up.writeBuffers[0].toString(), 'b'); }); it('should retrieve no more than the limit provided', () => { - up.upstreamChunkBuffer = Buffer.from('0123456789'); + up.writeBuffers = [Buffer.from('0123456789')]; + + const chunks = [...up.pullFromChunkBuffer(4)]; + assert.equal(chunks.join('').toString(), '0123'); - const chunk = up.pullFromChunkBuffer(4); - assert.equal(chunk.toString(), '0123'); - assert.equal(up.upstreamChunkBuffer.toString(), '456789'); + // length should be 1 + assert.equal(up.writeBuffers.length, 1); + assert.equal(up.writeBuffers[0].toString(), '456789'); }); - it('should retrieve less than the limit if no more data is available', () => { - up.upstreamChunkBuffer = Buffer.from('0123456789'); + it('should retrieve less than the limit if no more data is available (single write)', () => { + up.writeBuffers = [Buffer.from('0123456789')]; - const chunk = up.pullFromChunkBuffer(512); - assert.equal(chunk.toString(), '0123456789'); - assert.equal(up.upstreamChunkBuffer.toString(), ''); + const chunks = [...up.pullFromChunkBuffer(512)]; + assert.equal(chunks.join('').toString(), '0123456789'); + assert.equal(up.writeBuffers.length, 0); + }); + + it('should retrieve less than the limit if no more data is available (multi write)', () => { + // an array of 1-char buffers + up.writeBuffers = '0123456789'.split('').map(c => Buffer.from(c)); + + const chunks = [...up.pullFromChunkBuffer(512)]; + assert.equal(chunks.join('').toString(), '0123456789'); + assert.equal(up.writeBuffers.length, 0); + }); + + it('should retrieve a subset of part of a buffer and prepend the remainder', () => { + up.writeBuffers = [ + Buffer.from('0'), + Buffer.from('123'), + Buffer.from('456'), // this buffer should be split + Buffer.from('789'), + ]; + + const chunks = [...up.pullFromChunkBuffer(5)]; + assert.equal(chunks.join('').toString(), '01234'); + assert.equal(up.writeBuffers.length, 2); + assert.equal(up.writeBuffers[0].toString(), '56'); + assert.equal(up.writeBuffers[1].toString(), '789'); }); it('should return all data if `Infinity` is provided', () => { - up.upstreamChunkBuffer = Buffer.from('0123456789'); - const chunk = up.pullFromChunkBuffer(Infinity); - assert.equal(chunk.toString(), '0123456789'); - assert.equal(up.upstreamChunkBuffer.toString(), ''); + up.writeBuffers = [Buffer.from('012345'), Buffer.from('6789')]; + const chunks = [...up.pullFromChunkBuffer(Infinity)]; + assert.equal(chunks.join('').toString(), '0123456789'); + assert.equal(up.writeBuffers.length, 0); }); - it("should emit 'readFromChunkBuffer' asynchronously", done => { - up.pullFromChunkBuffer(0); + it("should emit 'readFromChunkBuffer' synchronously on each iterator", () => { + up.writeBuffers = [Buffer.from('012345'), Buffer.from('6789')]; - // setting this here proves it's async - up.on('readFromChunkBuffer', done); + const iter = up.pullFromChunkBuffer(Infinity); + let count = 0; + let loop = 0; + + up.on('readFromChunkBuffer', () => count++); + + while (!iter.next().done) { + assert.equal(count, loop++); + } }); }); describe('#waitForNextChunk', () => { - it('should resolve `true` asynchronously if `upstreamChunkBuffer.byteLength` has data', async () => { - up.upstreamChunkBuffer = Buffer.from('ab'); + it('should resolve `true` asynchronously if `writeBuffers.length` has data', async () => { + up.writeBuffers = [Buffer.from('ab')]; assert(await up.waitForNextChunk()); }); @@ -599,14 +673,14 @@ describe('resumable-upload', () => { assert.equal(await up.waitForNextChunk(), false); }); - it('should resolve `true` asynchronously if `upstreamChunkBuffer.byteLength` and `upstreamEnded`', async () => { - up.upstreamChunkBuffer = Buffer.from('ab'); + it('should resolve `true` asynchronously if `writeBuffers.length` and `upstreamEnded`', async () => { + up.writeBuffers = [Buffer.from('ab')]; up.upstreamEnded = true; assert(await up.waitForNextChunk()); }); - it('should wait for `wroteToChunkBuffer` if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`', async () => { + it('should wait for `wroteToChunkBuffer` if !`writeBuffers.length` && !`upstreamEnded`', async () => { const result = await new Promise(resolve => { up.waitForNextChunk().then(resolve); up.emit('wroteToChunkBuffer'); @@ -615,7 +689,7 @@ describe('resumable-upload', () => { assert(result); }); - it("should wait for 'upstreamFinished' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => { + it("should wait for 'upstreamFinished' if !`writeBuffers.length` && !`upstreamEnded`", async () => { await new Promise(resolve => { up.waitForNextChunk().then(resolve); up.emit('upstreamFinished'); @@ -635,8 +709,8 @@ describe('resumable-upload', () => { const result = await new Promise(resolve => { up.on('newListener', (event: string) => { if (event === 'upstreamFinished') { - // Update the `upstreamChunkBuffer` before emitting 'upstreamFinished' - up.upstreamChunkBuffer = Buffer.from('abc'); + // Update the `writeBuffers` before emitting 'upstreamFinished' + up.writeBuffers = [Buffer.from('abc')]; process.nextTick(() => up.emit('upstreamFinished')); } @@ -648,7 +722,7 @@ describe('resumable-upload', () => { assert.equal(result, true); }); - it("should wait for 'upstreamFinished' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => { + it("should wait for 'upstreamFinished' if !`writeBuffers.length` && !`upstreamEnded`", async () => { await new Promise(resolve => { up.waitForNextChunk().then(resolve); up.emit('upstreamFinished'); @@ -668,8 +742,8 @@ describe('resumable-upload', () => { const result = await new Promise(resolve => { up.on('newListener', (event: string) => { if (event === 'upstreamFinished') { - // Update the `upstreamChunkBuffer` before emitting 'upstreamFinished' - up.upstreamChunkBuffer = Buffer.from('abc'); + // Update the `writeBuffers` before emitting 'upstreamFinished' + up.writeBuffers = [Buffer.from('abc')]; process.nextTick(() => up.emit('upstreamFinished')); } @@ -720,7 +794,9 @@ describe('resumable-upload', () => { describe('#upstreamIterator', () => { it('should yield all data from upstream by default', done => { - up.upstreamChunkBuffer = Buffer.alloc(1); + up.writeBuffers = [Buffer.alloc(1)]; + assert(up.writableHighWaterMark); + up.pullFromChunkBuffer = (limit: number) => { assert.equal(limit, Infinity); done(); @@ -731,11 +807,11 @@ describe('resumable-upload', () => { }); it('should yield up to limit if provided', async () => { - up.upstreamChunkBuffer = Buffer.alloc(16); + up.writeBuffers = [Buffer.alloc(16)]; let data = Buffer.alloc(0); - for await (const {chunk} of up.upstreamIterator(8)) { + for await (const chunk of up.upstreamIterator(8)) { data = Buffer.concat([data, chunk]); } @@ -743,26 +819,26 @@ describe('resumable-upload', () => { }); it("should yield less than the limit if that's all that's available", async () => { - up.upstreamChunkBuffer = Buffer.alloc(8); + up.writeBuffers = [Buffer.alloc(8)]; up.upstreamEnded = true; let data = Buffer.alloc(0); - for await (const {chunk} of up.upstreamIterator(16)) { + for await (const chunk of up.upstreamIterator(16)) { data = Buffer.concat([data, chunk]); } assert.equal(data.byteLength, 8); }); - it('should yield many, arbitrarily sized chunks by default', async () => { + it('should yield many, arbitrarily sized chunks', async () => { up.waitForNextChunk = () => true; - up.pullFromChunkBuffer = () => Buffer.from('a'); + up.pullFromChunkBuffer = () => [Buffer.from('a')]; let data = Buffer.alloc(0); let count = 0; - for await (const {chunk} of up.upstreamIterator(16)) { + for await (const chunk of up.upstreamIterator(16)) { data = Buffer.concat([data, chunk]); count++; } @@ -770,22 +846,6 @@ describe('resumable-upload', () => { assert.equal(data.toString(), 'a'.repeat(16)); assert.equal(count, 16); }); - - it('should yield one single chunk if `oneChunkMode`', async () => { - up.waitForNextChunk = () => true; - up.pullFromChunkBuffer = () => Buffer.from('b'); - - let data = Buffer.alloc(0); - let count = 0; - - for await (const {chunk} of up.upstreamIterator(16, true)) { - data = Buffer.concat([data, chunk]); - count++; - } - - assert.equal(data.toString(), 'b'.repeat(16)); - assert.equal(count, 1); - }); }); describe('#createURI', () => { @@ -939,7 +999,7 @@ describe('resumable-upload', () => { describe('#startUploading', () => { beforeEach(() => { up.makeRequestStream = async () => null; - up.upstreamChunkBuffer = Buffer.alloc(16); + up.writeBuffers = [Buffer.alloc(16)]; }); it('should reset `numChunksReadInRequest` to 0', async () => { @@ -980,7 +1040,7 @@ describe('resumable-upload', () => { }); it("should 'fast-forward' upstream if `numBytesWritten` < `offset`", async () => { - up.upstreamChunkBuffer = Buffer.alloc(24); + up.writeBuffers = [Buffer.alloc(24)]; up.offset = 9; up.numBytesWritten = 1; @@ -990,11 +1050,12 @@ describe('resumable-upload', () => { // Should fast-forward (up.offset - up.numBytesWritten) bytes assert.equal(up.offset, 9); assert.equal(up.numBytesWritten, 9); - assert.equal(up.upstreamChunkBuffer.byteLength, 16); + assert.equal(up.writeBuffers.length, 1); + assert.equal(up.writeBuffers[0].byteLength, 16); }); it('should emit a progress event with the bytes written', done => { - up.upstreamChunkBuffer = Buffer.alloc(24); + up.writeBuffers = [Buffer.alloc(24)]; up.upstreamEnded = true; up.contentLength = 24; @@ -1077,7 +1138,7 @@ describe('resumable-upload', () => { reqOpts = requestOptions; }; - up.upstreamChunkBuffer = Buffer.alloc(UPSTREAM_BUFFER_SIZE); + up.writeBuffers = [Buffer.alloc(UPSTREAM_BUFFER_SIZE)]; up.upstreamEnded = UPSTREAM_ENDED; }); @@ -1304,28 +1365,30 @@ describe('resumable-upload', () => { up.chunkSize = 256; up.numBytesWritten = NUM_BYTES_WRITTEN; - up.upstreamChunkBuffer = Buffer.alloc(UPSTREAM_BUFFER_LENGTH, 'b'); + up.writeBuffers = [Buffer.alloc(UPSTREAM_BUFFER_LENGTH, 'b')]; - up.lastChunkSent = Buffer.concat([ + up.localWriteCache = [ Buffer.alloc(LAST_CHUNK_LENGTH, 'c'), // different to ensure this is the data that's prepended Buffer.alloc(expectedUnshiftAmount, 'a'), - ]); + ]; up.continueUploading = () => { assert.equal(up.offset, lastByteReceived + 1); assert.equal( - up.upstreamChunkBuffer.byteLength, + Buffer.concat(up.writeBuffers).byteLength, UPSTREAM_BUFFER_LENGTH + expectedUnshiftAmount ); assert.equal( - up.upstreamChunkBuffer.slice(0, expectedUnshiftAmount).toString(), + Buffer.concat(up.writeBuffers) + .subarray(0, expectedUnshiftAmount) + .toString(), 'a'.repeat(expectedUnshiftAmount) ); // we should discard part of the last chunk, as we know what the server // has at this point. - assert.equal(up.lastChunkSent.byteLength, 0); + assert.deepEqual(up.localWriteCache, []); done(); }; @@ -1872,13 +1935,13 @@ describe('resumable-upload', () => { up.attemptDelayedRetry({}); }); - it('should unshift last buffer, unset `offset`, and call `continueUploading` when not calling `startUploading`', done => { + it('should unshift the write buffer, unset `offset`, and call `continueUploading` when not calling `startUploading`', done => { up.startUploading = () => done('wanted `continueUploading`'); up.continueUploading = () => { assert.equal(up.numBytesWritten, 4); - assert.equal(up.lastChunkSent.byteLength, 0); + assert.equal(up.localWriteCache.length, 0); assert.equal( - up.upstreamChunkBuffer.toString(), + Buffer.concat(up.writeBuffers).toString(), 'a'.repeat(12) + 'b'.repeat(10) ); assert.equal(up.offset, undefined); @@ -1887,8 +1950,9 @@ describe('resumable-upload', () => { }; up.numBytesWritten = 16; - up.lastChunkSent = Buffer.alloc(12, 'a'); - up.upstreamChunkBuffer = Buffer.alloc(10, 'b'); + up.localWriteCache = [Buffer.alloc(12, 'a')]; + up.localWriteCacheByteLength = up.localWriteCache[0].byteLength; + up.writeBuffers = [Buffer.alloc(10, 'b')]; up.offset = 16; up.attemptDelayedRetry({}); @@ -2162,7 +2226,7 @@ describe('resumable-upload', () => { assert.strictEqual(request.opts.method, 'PUT'); assert.strictEqual(request.opts.url, uri); - // We should be writing multiple chunks down the wire + // We should be writing multiple buffers down the wire assert(request.chunkWritesInRequest > 1); assert.equal(request.dataReceived, CONTENT_LENGTH); @@ -2324,8 +2388,9 @@ describe('resumable-upload', () => { assert.strictEqual(request.opts.method, 'PUT'); assert.strictEqual(request.opts.url, uri); - // We should be writing 1, single chunk down the wire - assert.strictEqual(request.chunkWritesInRequest, 1); + // We should be writing N buffers down the wire, although + // the request is "1 chunk" + assert(request.chunkWritesInRequest >= 1); if (requests.length - i === 1) { // The last chunk @@ -2445,7 +2510,7 @@ describe('resumable-upload', () => { assert.strictEqual(request.opts.method, 'PUT'); assert.strictEqual(request.opts.url, uri); - // We should be writing multiple chunks down the wire + // No data should be written assert(request.chunkWritesInRequest === 0); assert.equal(request.dataReceived, CONTENT_LENGTH);