Skip to content

Commit

Permalink
fix(@aws-amplify/storage): throw error if all upload parts complete b…
Browse files Browse the repository at this point in the history
…ut upload cannot be finished (#9317)

* fix(@aws-amplify/storage): throw error if all upload parts complete but upload cannot be finished

* fix(@aws-amplify/storage): updating tests

* fix(@aws-amplify/storage): fixed unit tests

* fix(@aws-amplify/storage): revert prettification

* fix(@aws-amplify/storage): revert prettification

* fix(@aws-amplify/storage): revert prettification

* working on tests

* response to auchu@ feedback

* quick test fix

* fix(storage): reverted trial changes

* fix(storage):  throw error if all upload parts complete but upload cannot be finished

* fix(storage): resolving merge conflicts from main

* fix(storage): adjusting tests

* fix: Remove cancel variable entirely

Co-authored-by: Aaron S <94858815+stocaaro@users.noreply.github.com>
  • Loading branch information
katiegoines and stocaaro authored May 16, 2022
1 parent 29a40cc commit 798a8f0
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ describe('multi part upload tests', () => {
try {
await uploader.upload();
} catch (error) {
expect(error.message).toBe('Upload was cancelled.');
expect(error.message).toBe('Part 2 just going to fail in 100ms');
}

// Should have called 5 times =>
Expand Down Expand Up @@ -314,27 +314,24 @@ describe('multi part upload tests', () => {
Key: testParams.Key,
UploadId: testUploadId,
});
// Progress reporting works as well
expect(eventSpy).toHaveBeenNthCalledWith(1, {
key: testParams.Key,
loaded: testMinPartSize,
part: 1,
total: testParams.Body.length,
});
expect(eventSpy).toHaveBeenNthCalledWith(2, {
key: testParams.Key,
loaded: testParams.Body.length,
part: 2,
total: testParams.Body.length,
});

// As the 'sendUploadProgress' happens when the upload is 100% complete,
// it won't be called, as an error is thrown before upload completion.
expect(eventSpy).toBeCalledTimes(0);
});

test('error case: cleanup failed', async () => {
jest.spyOn(S3Client.prototype, 'send').mockImplementation(async command => {
if (command instanceof CreateMultipartUploadCommand) {
return Promise.resolve({ UploadId: testUploadId });
} else if (command instanceof UploadPartCommand) {
return Promise.reject(new Error('failed to upload'));
return Promise.resolve({
PartNumber: testParams.part,
Body: testParams.body,
UploadId: testUploadId,
Key: testParams.key,
Bucket: testParams.bucket,
});
} else if (command instanceof ListPartsCommand) {
return Promise.resolve({
Parts: [
Expand All @@ -353,7 +350,7 @@ describe('multi part upload tests', () => {
new events.EventEmitter()
);
await expect(uploader.upload()).rejects.toThrow(
'Upload was cancelled. Multi Part upload clean up failed'
'Multipart upload clean up failed.'
);
});

Expand All @@ -366,7 +363,7 @@ describe('multi part upload tests', () => {
ETag: 'test_etag_' + command.input.PartNumber,
});
} else if (command instanceof CompleteMultipartUploadCommand) {
return Promise.reject('error');
return Promise.reject(new Error('Error completing multipart upload.'));
}
});
const loggerSpy = jest.spyOn(Logger.prototype, '_log');
Expand All @@ -375,11 +372,10 @@ describe('multi part upload tests', () => {
testOpts,
new events.EventEmitter()
);
await uploader.upload();
expect(loggerSpy).toHaveBeenCalledWith(
'ERROR',
'error happened while finishing the upload. Cancelling the multipart upload',
'error'
);
});

await expect(uploader.upload()).rejects.toThrow('Error completing multipart upload.');
expect(loggerSpy).toHaveBeenNthCalledWith(1, 'DEBUG', 'testUploadId');
expect(loggerSpy).toHaveBeenNthCalledWith(2, 'ERROR', 'Error happened while finishing the upload.');
expect(loggerSpy).toHaveBeenNthCalledWith(3, 'ERROR', 'Error. Cancelling the multipart upload.');
})
});
164 changes: 75 additions & 89 deletions packages/storage/src/providers/AWSS3ProviderManagedUpload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ export class AWSS3ProviderManagedUpload {
private params: PutObjectRequest = null;
private opts = null;
private completedParts: CompletedPart[] = [];
private cancel = false;
private s3client: S3Client;
private uploadId = null;

// Progress reporting
private bytesUploaded = 0;
Expand All @@ -74,79 +74,87 @@ export class AWSS3ProviderManagedUpload {
}

public async upload() {
this.body = await this.validateAndSanitizeBody(this.params.Body);
this.totalBytesToUpload = this.byteLength(this.body);
if (this.totalBytesToUpload <= this.minPartSize) {
// Multipart upload is not required. Upload the sanitized body as is
this.params.Body = this.body;
const putObjectCommand = new PutObjectCommand(this.params);
return this.s3client.send(putObjectCommand);
} else {
// Step 1: Initiate the multi part upload
const uploadId = await this.createMultiPartUpload();

// Step 2: Upload chunks in parallel as requested
const numberOfPartsToUpload = Math.ceil(
this.totalBytesToUpload / this.minPartSize
);

const parts: Part[] = this.createParts();
for (
let start = 0;
start < numberOfPartsToUpload;
start += this.queueSize
) {
/** This first block will try to cancel the upload if the cancel
* request came before any parts uploads have started.
**/
await this.checkIfUploadCancelled(uploadId);

// Upload as many as `queueSize` parts simultaneously
await this.uploadParts(
uploadId,
parts.slice(start, start + this.queueSize)
try {
this.body = await this.validateAndSanitizeBody(this.params.Body);
this.totalBytesToUpload = this.byteLength(this.body);
if (this.totalBytesToUpload <= this.minPartSize) {
// Multipart upload is not required. Upload the sanitized body as is
this.params.Body = this.body;
const putObjectCommand = new PutObjectCommand(this.params);
return this.s3client.send(putObjectCommand);
} else {
// Step 1: Initiate the multi part upload
this.uploadId = await this.createMultiPartUpload();

// Step 2: Upload chunks in parallel as requested
const numberOfPartsToUpload = Math.ceil(
this.totalBytesToUpload / this.minPartSize
);

/** Call cleanup a second time in case there were part upload requests
* in flight. This is to ensure that all parts are cleaned up.
*/
await this.checkIfUploadCancelled(uploadId);
}
const parts: Part[] = this.createParts();
for (
let start = 0;
start < numberOfPartsToUpload;
start += this.queueSize
) {

// Upload as many as `queueSize` parts simultaneously
await this.uploadParts(
this.uploadId,
parts.slice(start, start + this.queueSize)
);
}

parts.map(part => {
this.removeEventListener(part);
});
parts.map(part => {
this.removeEventListener(part);
});

// Step 3: Finalize the upload such that S3 can recreate the file
return await this.finishMultiPartUpload(uploadId);
// Step 3: Finalize the upload such that S3 can recreate the file
return await this.finishMultiPartUpload(this.uploadId);
}
} catch (error) {
// if any error is thrown, call cleanup
await this.cleanup(this.uploadId);
logger.error('Error. Cancelling the multipart upload.');
throw error;
}
}

private createParts(): Part[] {
const parts: Part[] = [];
for (let bodyStart = 0; bodyStart < this.totalBytesToUpload; ) {
const bodyEnd = Math.min(
bodyStart + this.minPartSize,
this.totalBytesToUpload
);
parts.push({
bodyPart: this.body.slice(bodyStart, bodyEnd),
partNumber: parts.length + 1,
emitter: new events.EventEmitter(),
_lastUploadedBytes: 0,
});
bodyStart += this.minPartSize;
try {
const parts: Part[] = [];
for (let bodyStart = 0; bodyStart < this.totalBytesToUpload; ) {
const bodyEnd = Math.min(
bodyStart + this.minPartSize,
this.totalBytesToUpload
);
parts.push({
bodyPart: this.body.slice(bodyStart, bodyEnd),
partNumber: parts.length + 1,
emitter: new events.EventEmitter(),
_lastUploadedBytes: 0,
});
bodyStart += this.minPartSize;
}
return parts;
} catch (error) {
logger.error(error);
throw error;
}
return parts;
}

private async createMultiPartUpload() {
const createMultiPartUploadCommand = new CreateMultipartUploadCommand(
this.params
);
const response = await this.s3client.send(createMultiPartUploadCommand);
logger.debug(response.UploadId);
return response.UploadId;
try {
const createMultiPartUploadCommand = new CreateMultipartUploadCommand(
this.params
);
const response = await this.s3client.send(createMultiPartUploadCommand);
logger.debug(response.UploadId);
return response.UploadId;
} catch (error) {
logger.error(error);
throw error;
}
}

/**
Expand Down Expand Up @@ -191,11 +199,9 @@ export class AWSS3ProviderManagedUpload {
}
} catch (error) {
logger.error(
'error happened while uploading a part. Cancelling the multipart upload',
error
'Error happened while uploading a part. Cancelling the multipart upload'
);
this.cancelUpload();
return;
throw error;
}
}

Expand All @@ -211,31 +217,11 @@ export class AWSS3ProviderManagedUpload {
const data = await this.s3client.send(completeUploadCommand);
return data.Key;
} catch (error) {
logger.error(
'error happened while finishing the upload. Cancelling the multipart upload',
error
);
this.cancelUpload();
return;
logger.error('Error happened while finishing the upload.');
throw error;
}
}

private async checkIfUploadCancelled(uploadId: string) {
if (this.cancel) {
let errorMessage = 'Upload was cancelled.';
try {
await this.cleanup(uploadId);
} catch (error) {
errorMessage += ` ${error.message}`;
}
throw new Error(errorMessage);
}
}

public cancelUpload() {
this.cancel = true;
}

private async cleanup(uploadId: string) {
// Reset this's state
this.body = null;
Expand All @@ -255,7 +241,7 @@ export class AWSS3ProviderManagedUpload {
const data = await this.s3client.send(new ListPartsCommand(input));

if (data && data.Parts && data.Parts.length > 0) {
throw new Error('Multi Part upload clean up failed');
throw new Error('Multipart upload clean up failed.');
}
}

Expand Down

0 comments on commit 798a8f0

Please sign in to comment.