Skip to content

Commit

Permalink
[Storage] Cleared timeouts upon pause/resume of uploads (#6667)
Browse files Browse the repository at this point in the history
  • Loading branch information
maneesht authored Oct 12, 2022
1 parent 48a127b commit 5f55ed8
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/great-houses-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@firebase/storage": patch
---

Cleared retry timeouts when uploads are paused/canceled
17 changes: 8 additions & 9 deletions packages/storage/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class UploadTask {
private _metadataErrorHandler: (p1: StorageError) => void;
private _resolve?: (p1: UploadTaskSnapshot) => void = undefined;
private _reject?: (p1: StorageError) => void = undefined;
private pendingTimeout?: ReturnType<typeof setTimeout>;
private _promise: Promise<UploadTaskSnapshot>;

private sleepTime: number;
Expand Down Expand Up @@ -191,7 +192,8 @@ export class UploadTask {
// Happens if we miss the metadata on upload completion.
this._fetchMetadata();
} else {
setTimeout(() => {
this.pendingTimeout = setTimeout(() => {
this.pendingTimeout = undefined;
this._continueUpload();
}, this.sleepTime);
}
Expand Down Expand Up @@ -405,20 +407,17 @@ export class UploadTask {
}
switch (state) {
case InternalTaskState.CANCELING:
case InternalTaskState.PAUSING:
// TODO(andysoto):
// assert(this.state_ === InternalTaskState.RUNNING ||
// this.state_ === InternalTaskState.PAUSING);
this._state = state;
if (this._request !== undefined) {
this._request.cancel();
}
break;
case InternalTaskState.PAUSING:
// TODO(andysoto):
// assert(this.state_ === InternalTaskState.RUNNING);
this._state = state;
if (this._request !== undefined) {
this._request.cancel();
} else if (this.pendingTimeout) {
clearTimeout(this.pendingTimeout);
this.pendingTimeout = undefined;
this.completeTransitions_();
}
break;
case InternalTaskState.RUNNING:
Expand Down
137 changes: 127 additions & 10 deletions packages/storage/test/unit/task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { assert, expect } from 'chai';
import { assert, expect, use } from 'chai';
import { FbsBlob } from '../../src/implementation/blob';
import { Location } from '../../src/implementation/location';
import { Unsubscribe } from '../../src/implementation/observer';
Expand All @@ -31,8 +31,13 @@ import {
} from './testshared';
import { injectTestConnection } from '../../src/platform/connection';
import { Deferred } from '@firebase/util';
import { retryLimitExceeded } from '../../src/implementation/error';
import { canceled, retryLimitExceeded } from '../../src/implementation/error';
import { SinonFakeTimers, useFakeTimers } from 'sinon';
import * as sinon from 'sinon';
import sinonChai from 'sinon-chai';
import { DEFAULT_MAX_UPLOAD_RETRY_TIME } from '../../src/implementation/constants';

use(sinonChai);

const testLocation = new Location('bucket', 'object');
const smallBlob = new FbsBlob(new Uint8Array([97]));
Expand Down Expand Up @@ -361,7 +366,7 @@ describe('Firebase Storage > Upload Task', () => {
function handleStateChange(
requestHandler: RequestHandler,
blob: FbsBlob
): Promise<TotalState> {
): { taskPromise: Promise<TotalState>; task: UploadTask } {
const storageService = storageServiceWithHandler(requestHandler);
const task = new UploadTask(
new Reference(storageService, testLocation),
Expand Down Expand Up @@ -410,7 +415,7 @@ describe('Firebase Storage > Upload Task', () => {
}
);

return deferred.promise;
return { taskPromise: deferred.promise, task };
}

it('Calls callback sequences for small uploads correctly', () => {
Expand All @@ -422,13 +427,13 @@ describe('Firebase Storage > Upload Task', () => {
it('properly times out if large blobs returns a 503 when finalizing', async () => {
clock = useFakeTimers();
// Kick off upload
const promise = handleStateChange(
const { taskPromise } = handleStateChange(
fake503ForFinalizeServerHandler(),
bigBlob
);
// Run all timers
await clock.runAllAsync();
const { events, progress } = await promise;
const { events, progress } = await taskPromise;
expect(events.length).to.equal(2);
expect(events[0]).to.deep.equal({ type: 'resume' });
expect(events[1].type).to.deep.equal('error');
Expand Down Expand Up @@ -460,10 +465,13 @@ describe('Firebase Storage > Upload Task', () => {
it('properly times out if large blobs returns a 503 when uploading', async () => {
clock = useFakeTimers();
// Kick off upload
const promise = handleStateChange(fake503ForUploadServerHandler(), bigBlob);
const { taskPromise } = handleStateChange(
fake503ForUploadServerHandler(),
bigBlob
);
// Run all timers
await clock.runAllAsync();
const { events, progress } = await promise;
const { events, progress } = await taskPromise;
expect(events.length).to.equal(2);
expect(events[0]).to.deep.equal({ type: 'resume' });
expect(events[1].type).to.deep.equal('error');
Expand All @@ -478,13 +486,122 @@ describe('Firebase Storage > Upload Task', () => {
});
clock.restore();
});

/**
* Starts upload, finds the first instance of an exponential backoff, and resolves `readyToCancel` when done.
* @returns readyToCancel, taskPromise and task
*/
function resumeCancelSetup(): {
readyToCancel: Promise<null>;
taskPromise: Promise<TotalState>;
task: UploadTask;
} {
clock = useFakeTimers();
const fakeSetTimeout = clock.setTimeout;

let gotFirstEvent = false;

const stub = sinon.stub(global, 'setTimeout');

// Function that notifies when we are in the middle of an exponential backoff
const readyToCancel = new Promise<null>(resolve => {
stub.callsFake((fn, timeout) => {
// @ts-ignore The types for `stub.callsFake` is incompatible with types of `clock.setTimeout`
const res = fakeSetTimeout(fn, timeout);
if (timeout !== DEFAULT_MAX_UPLOAD_RETRY_TIME) {
if (!gotFirstEvent || timeout === 0) {
clock.tick(timeout as number);
} else {
// If the timeout isn't 0 and it isn't the max upload retry time, it's most likely due to exponential backoff.
resolve(null);
}
}
return res;
});
});
readyToCancel.then(
() => stub.restore(),
() => stub.restore()
);
return {
...handleStateChange(
fake503ForUploadServerHandler(undefined, () => (gotFirstEvent = true)),
bigBlob
),
readyToCancel
};
}
it('properly errors with a cancel StorageError if a pending timeout remains', async () => {
// Kick off upload
const { readyToCancel, taskPromise: promise, task } = resumeCancelSetup();

await readyToCancel;
task.cancel();

const { events, progress } = await promise;
expect(events.length).to.equal(2);
expect(events[0]).to.deep.equal({ type: 'resume' });
expect(events[1].type).to.deep.equal('error');
const canceledError = canceled();
expect(events[1].data!.name).to.deep.equal(canceledError.name);
expect(events[1].data!.message).to.deep.equal(canceledError.message);
const blobSize = bigBlob.size();
expect(progress.length).to.equal(1);
expect(progress[0]).to.deep.equal({
bytesTransferred: 0,
totalBytes: blobSize
});
expect(clock.countTimers()).to.eq(0);
clock.restore();
});
it('properly errors with a pause StorageError if a pending timeout remains', async () => {
// Kick off upload
const { readyToCancel, taskPromise: promise, task } = resumeCancelSetup();

await readyToCancel;

task.pause();
expect(clock.countTimers()).to.eq(0);
task.resume();
await clock.runAllAsync();

// Run all timers
const { events, progress } = await promise;
expect(events.length).to.equal(4);
expect(events[0]).to.deep.equal({ type: 'resume' });
expect(events[1]).to.deep.equal({ type: 'pause' });
expect(events[2]).to.deep.equal({ type: 'resume' });
expect(events[3].type).to.deep.equal('error');
const retryError = retryLimitExceeded();
expect(events[3].data!.name).to.deep.equal(retryError.name);
expect(events[3].data!.message).to.deep.equal(retryError.message);
const blobSize = bigBlob.size();
expect(progress.length).to.equal(3);
expect(progress[0]).to.deep.equal({
bytesTransferred: 0,
totalBytes: blobSize
});
expect(progress[1]).to.deep.equal({
bytesTransferred: 0,
totalBytes: blobSize
});
expect(progress[2]).to.deep.equal({
bytesTransferred: 0,
totalBytes: blobSize
});
expect(clock.countTimers()).to.eq(0);
clock.restore();
});
it('tests if small requests that respond with 500 retry correctly', async () => {
clock = useFakeTimers();
// Kick off upload
const promise = handleStateChange(fakeOneShot503ServerHandler(), smallBlob);
const { taskPromise } = handleStateChange(
fakeOneShot503ServerHandler(),
smallBlob
);
// Run all timers
await clock.runAllAsync();
const { events, progress } = await promise;
const { events, progress } = await taskPromise;
expect(events.length).to.equal(2);
expect(events[0]).to.deep.equal({ type: 'resume' });
expect(events[1].type).to.deep.equal('error');
Expand Down
8 changes: 6 additions & 2 deletions packages/storage/test/unit/testshared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ export function fakeServerHandler(

/**
* Responds with a 503 for finalize.
* @param fakeMetadata metadata to respond with for query
* @param fakeMetadata metadata to respond with for finalize
* @returns a handler for requests
*/
export function fake503ForFinalizeServerHandler(
Expand Down Expand Up @@ -459,7 +459,8 @@ export function fake503ForFinalizeServerHandler(
* @returns a handler for requests
*/
export function fake503ForUploadServerHandler(
fakeMetadata: Partial<Metadata> = defaultFakeMetadata
fakeMetadata: Partial<Metadata> = defaultFakeMetadata,
cb?: () => void
): RequestHandler {
const stats: {
[num: number]: {
Expand Down Expand Up @@ -536,6 +537,9 @@ export function fake503ForUploadServerHandler(
const isUpload = commands.indexOf('upload') !== -1;

if (isUpload) {
if (cb) {
cb();
}
return {
status: 503,
body: JSON.stringify(fakeMetadata),
Expand Down

0 comments on commit 5f55ed8

Please sign in to comment.