Skip to content

Commit

Permalink
feat(browser): multipartUpload err will cancel this task
Browse files Browse the repository at this point in the history
  • Loading branch information
binghaiwang committed Mar 23, 2018
1 parent 0f003aa commit 698a241
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 29 deletions.
63 changes: 36 additions & 27 deletions lib/browser/managed_upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ const proto = exports;
* @param {String} name
* @param {String|File} file
* @param {Object} options
* {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64
* {String} options.callback.url the OSS sends a callback request to this URL
* {String} options.callback.host The host header value for initiating callback requests
* {String} options.callback.body The value of the request body when a callback is initiated
* {String} options.callback.contentType The Content-Type of the callback requests initiatiated
* {Object} options.callback.customValue Custom parameters are a map of key-values, e.g:
* {Object} options.callback
* The callback parameter is composed of a JSON string encoded in Base64
* {String} options.callback.url
* the OSS sends a callback request to this URL
* {String} options.callback.host
* The host header value for initiating callback requests
* {String} options.callback.body
* The value of the request body when a callback is initiated
* {String} options.callback.contentType
* The Content-Type of the callback requests initiatiated
* {Object} options.callback.customValue
* Custom parameters are a map of key-values, e.g:
* customValue = {
* key1: 'value1',
* key2: 'value2'
Expand Down Expand Up @@ -107,7 +113,7 @@ proto._resumeMultipart = function* _resumeMultipart(checkpoint, options) {
const partOffs = this._divideParts(fileSize, partSize);
const numParts = partOffs.length;

const uploadPartJob = function* (self, partNo) {
const uploadPartJob = function* uploadPartJob(self, partNo) {
if (!self.isCancel()) {
try {
const pi = partOffs[partNo - 1];
Expand All @@ -117,16 +123,19 @@ proto._resumeMultipart = function* _resumeMultipart(checkpoint, options) {
};

const result = yield self._uploadPart(name, uploadId, partNo, data);
doneParts.push({
number: partNo,
etag: result.res.headers.etag,
});
checkpoint.doneParts = doneParts;

if (!self.isCancel() && options && options.progress) {
yield options.progress(doneParts.length / numParts, checkpoint, result.res);
if (!self.isCancel()) {
doneParts.push({
number: partNo,
etag: result.res.headers.etag,
});
checkpoint.doneParts = doneParts;

if (options && options.progress) {
yield options.progress(doneParts.length / numParts, checkpoint, result.res);
}
}
} catch (err) {
self.cancel();
err.partNum = partNo;
throw err;
}
Expand Down Expand Up @@ -156,24 +165,25 @@ proto._resumeMultipart = function* _resumeMultipart(checkpoint, options) {
// start uploads jobs
const errors = yield this._thunkPool(jobs, parallel);

if (this.isCancel()) {
jobs = null;
throw this._makeCancelEvent();
}

// check errors after all jobs are completed
if (errors && errors.length > 0) {
this.resetCancelFlag();
const err = errors[0];
err.message = `Failed to upload some parts with error: ${err.toString()} part_num: ${err.partNum}`;
throw err;
}

if (this.isCancel()) {
jobs = null;
throw this._makeCancelEvent();
}
}
return yield this.completeMultipartUpload(name, uploadId, doneParts, options);
};


is.file = function (file) {
return typeof (File) !== 'undefined' && file instanceof File;
is.file = function file(obj) {
return typeof (File) !== 'undefined' && obj instanceof File;
};

/**
Expand Down Expand Up @@ -241,7 +251,7 @@ WebFileReadStream.prototype._read = function _read(size) {
size = size || defaultReadSize;

const that = this;
this.reader.onload = function (e) {
this.reader.onload = function onload(e) {
that.fileBuffer = new Buffer(new Uint8Array(e.target.result));
that.file = null;
that.readFileAndPush(size);
Expand Down Expand Up @@ -270,7 +280,7 @@ proto._createStream = function _createStream(file, start, end) {

proto._getPartSize = function _getPartSize(fileSize, partSize) {
const maxNumParts = 10 * 1000;
const defaultPartSize = 1 * 1024 * 1024;
const defaultPartSize = 1024 * 1024;

if (!partSize) {
return defaultPartSize;
Expand Down Expand Up @@ -300,10 +310,9 @@ proto._divideParts = function _divideParts(fileSize, partSize) {
};

// cancel is not error , so create an object
proto._makeCancelEvent = function () {
const cancelEvent = {
proto._makeCancelEvent = function _makeCancelEvent() {
return {
status: 0,
name: 'cancel',
};
return cancelEvent;
};
2 changes: 1 addition & 1 deletion lib/common/thunkpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ proto._thunkPool = function thunkPool(thunks, parallel) {
if (endQueueSum === concurrency) {
queue.fns = [];
queue.buffer = [];
resolve();
resolve(_errs);
}
}

Expand Down
9 changes: 8 additions & 1 deletion test/browser/browser.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -786,27 +786,34 @@ describe('browser', () => {
const name = `${prefix}multipart/upload-file-exception`;

const stubUploadPart = sinon.stub(this.store, '_uploadPart');
stubUploadPart.throws('TestUploadPartException');
const testUploadPartException = new Error();
testUploadPartException.name = 'TestUploadPartException';
testUploadPartException.status = 403;
stubUploadPart.throws(testUploadPartException);

let errorMsg = '';
let partNumz = 0;
let errStatus = 0;
try {
yield this.store.multipartUpload(name, file, {
progress() {
return function (done) {
done();
};
},
partSize: 100 * 1024,
});
} catch (err) {
errorMsg = err.message;
partNumz = err.partNum;
errStatus = err.status;
}
assert.equal(
errorMsg,
'Failed to upload some parts with error: TestUploadPartException part_num: 1',
);
assert.equal(partNumz, 1);
assert.equal(errStatus, 403);
this.store._uploadPart.restore();
});

Expand Down

0 comments on commit 698a241

Please sign in to comment.