Skip to content

Commit

Permalink
Fix/retry (#939)
Browse files Browse the repository at this point in the history
* fix: retry when could rebuild params.stream and ignore when not

* fix: test case under Nodejs12

* fix: ignore stream

* docs: retryMax

* fix: codecov

* fix: test case about retry

* fix: rename _createBuffer
  • Loading branch information
beajer committed Mar 17, 2021
1 parent 1c676da commit 443c841
Show file tree
Hide file tree
Showing 13 changed files with 6,220 additions and 6,015 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ options:
`fetch` mode ,else `XMLHttpRequest`
- [enableProxy] {Boolean}, Enable proxy request, default is false.
- [proxy] {String | Object}, proxy agent uri or options, default is null.
- [retryMax] {Number}, used by auto retry send request count when request error is net error or timeout.
- [retryMax] {Number}, used by auto retry send request count when request error is net error or timeout. **_NOTE:_** Not support `put` with stream, `putStream`, `append` with stream because the stream can only be consumed once

example:

Expand Down Expand Up @@ -396,6 +396,19 @@ const store = new OSS({
});
```

5. retry request with stream
```js
for (let i = 0; i <= store.options.retryMax; i++) {
try {
const result = await store.putStream("<example-object>", fs.createReadStream("<example-path>"));
console.log(result);
break; // break if success
} catch (e) {
console.log(e);
}
}
```

## Bucket Operations

### .listBuckets(query[, options])
Expand Down
11,614 changes: 5,865 additions & 5,749 deletions dist/aliyun-oss-sdk.js

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions dist/aliyun-oss-sdk.min.js

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions lib/browser/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,22 @@ proto.authorization = function authorization(method, resource, subres, headers)
*/

proto.request = async function (params) {
const isAvailableStream = params.stream ? params.stream.readable : true;
if (this.options.retryMax && isAvailableStream) {
this.request = retry(request.bind(this), this.options.retryMax, {
if (this.options.retryMax) {
return await retry(request.bind(this), this.options.retryMax, {
errorHandler: (err) => {
const _errHandle = (_err) => {
if (params.stream) return false;
const statusErr = [-1, -2].includes(_err.status);
const requestErrorRetryHandle = this.options.requestErrorRetryHandle || (() => true);
return statusErr && requestErrorRetryHandle(_err);
};
if (_errHandle(err)) return true;
return false;
}
});
})(params);
} else {
this.request = request.bind(this);
return request.call(this, params);
}

return await this.request(params);
};

async function request(params) {
Expand Down
97 changes: 40 additions & 57 deletions lib/browser/managed-upload.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


// var debug = require('debug')('ali-oss:multipart');
const util = require('util');
const path = require('path');
Expand All @@ -9,6 +7,7 @@ const { isBlob } = require('../common/utils/isBlob');
const { isFile } = require('../common/utils/isFile');
const { isArray } = require('../common/utils/isArray');
const { isBuffer } = require('../common/utils/isBuffer');
const { retry } = require('../common/utils/retry');

const proto = exports;

Expand Down Expand Up @@ -103,7 +102,6 @@ proto.multipartUpload = async function multipartUpload(name, file, options = {})
await options.progress(0, checkpoint, initResult.res);
}


return await this._resumeMultipart(checkpoint, options);
};

Expand All @@ -118,9 +116,7 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
if (this.isCancel()) {
throw this._makeCancelEvent();
}
const {
file, fileSize, partSize, uploadId, doneParts, name
} = checkpoint;
const { file, fileSize, partSize, uploadId, doneParts, name } = checkpoint;

const internalDoneParts = [];

Expand All @@ -132,45 +128,24 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
const numParts = partOffs.length;
let multipartFinish = false;

let uploadPartJob = function uploadPartJob(self, partNo) {
let uploadPartJob = (self, partNo) => {
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
try {
if (!self.isCancel()) {
const pi = partOffs[partNo - 1];
const stream = self._createStream(file, pi.start, pi.end);
const content = await self._createBuffer(file, pi.start, pi.end);
const data = {
stream,
content,
size: pi.end - pi.start
};

if (isArray(self.multipartUploadStreams)) {
self.multipartUploadStreams.push(stream);
} else {
self.multipartUploadStreams = [stream];
}

const removeStreamFromMultipartUploadStreams = function () {
if (!stream.destroyed) {
stream.destroy();
}
const index = self.multipartUploadStreams.indexOf(stream);
if (index !== -1) {
self.multipartUploadStreams.splice(index, 1);
}
};

stream.on('close', removeStreamFromMultipartUploadStreams);
stream.on('end', removeStreamFromMultipartUploadStreams);
stream.on('error', removeStreamFromMultipartUploadStreams);

let result;
try {
result = await self._uploadPart(name, uploadId, partNo, data, {
timeout: options.timeout
});
} catch (error) {
removeStreamFromMultipartUploadStreams();
if (error.status === 404) {
throw self._makeAbortEvent();
}
Expand Down Expand Up @@ -215,16 +190,22 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
const parallel = options.parallel || defaultParallel;

// upload in parallel
const jobErr = await this._parallel(todo, parallel, value => new Promise((resolve, reject) => {
uploadPartJob(that, value).then((result) => {
if (result) {
internalDoneParts.push(result);
}
resolve();
}).catch((err) => {
reject(err);
});
}));
const jobErr = await this._parallel(
todo,
parallel,
value => new Promise((resolve, reject) => {
uploadPartJob(that, value)
.then(result => {
if (result) {
internalDoneParts.push(result);
}
resolve();
})
.catch(err => {
reject(err);
});
})
);
multipartFinish = true;

const abortEvent = jobErr.find(err => err.name === 'abort');
Expand All @@ -236,7 +217,9 @@ proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
}

if (jobErr && jobErr.length > 0) {
jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${jobErr[0].partNum}`;
jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${
jobErr[0].partNum
}`;
throw jobErr[0];
}
return await this.completeMultipartUpload(name, uploadId, internalDoneParts, options);
Expand Down Expand Up @@ -289,9 +272,12 @@ WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
};

WebFileReadStream.prototype._read = function _read(size) {
if ((this.file && this.start >= this.file.size) ||
(this.fileBuffer && this.start >= this.fileBuffer.length) ||
(this.finish) || (this.start === 0 && !this.file)) {
if (
(this.file && this.start >= this.file.size) ||
(this.fileBuffer && this.start >= this.fileBuffer.length) ||
this.finish ||
(this.start === 0 && !this.file)
) {
if (!this.finish) {
this.fileBuffer = null;
this.finish = true;
Expand All @@ -317,21 +303,16 @@ WebFileReadStream.prototype._read = function _read(size) {
}
};

proto._createStream = function _createStream(file, start, end) {
proto._createBuffer = async function _createBuffer(file, start, end) {
if (isBlob(file) || isFile(file)) {
return new WebFileReadStream(file.slice(start, end));
const _file = file.slice(start, end);
const fileContent = await _file.arrayBuffer();
return Buffer.from(fileContent);
} else if (isBuffer(file)) {
// we can't use Readable.from() since it is only support in Node v10
const iterable = file.subarray(start, end);
return new Readable({
read() {
this.push(iterable);
this.push(null);
}
});
return file.subarray(start, end);
} else {
throw new Error('_createBuffer requires File/Blob/Buffer.');
}

throw new Error('_createStream requires Buffer/File/Blob.');
};

proto._getPartSize = function _getPartSize(fileSize, partSize) {
Expand All @@ -343,7 +324,9 @@ proto._getPartSize = function _getPartSize(fileSize, partSize) {

if (partSize < safeSize) {
partSize = safeSize;
console.warn(`partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`);
console.warn(
`partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`
);
}
return partSize;
};
Expand Down
15 changes: 2 additions & 13 deletions lib/browser/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ proto.append = async function append(name, file, options) {
proto.put = async function put(name, file, options) {
let content;
options = options || {};
options.headers = options.headers || {};
name = this._objectName(name);
if (isBuffer(file)) {
content = file;
Expand All @@ -70,24 +71,12 @@ proto.put = async function put(name, file, options) {
}
}

const stream = this._createStream(file, 0, file.size);
content = await this._createBuffer(file, 0, file.size);
options.contentLength = await this._getFileSize(file);
try {
const result = await this.putStream(name, stream, options);
return result;
} catch (err) {
if (err.code === 'RequestTimeTooSkewed') {
this.options.amendTimeSkewed = +new Date(err.serverTime) - new Date();
return await this.put(name, file, options);
} else {
throw err;
}
}
} else {
throw new TypeError('Must provide Buffer/Blob/File for put.');
}

options.headers = options.headers || {};
this._convertMetaToHeaders(options.meta, options.headers);

const method = options.method || 'PUT';
Expand Down
20 changes: 10 additions & 10 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,22 @@ proto.authorization = function authorization(method, resource, subres, headers)
*/

proto.request = async function (params) {
const isAvailableStream = params.stream ? params.stream.readable : true;
if (this.options.retryMax && isAvailableStream) {
this.request = retry(request.bind(this), this.options.retryMax, {
errorHandler: (err) => {
const _errHandle = (_err) => {
if (this.options.retryMax) {
return await retry(request.bind(this), this.options.retryMax, {
errorHandler: err => {
const _errHandle = _err => {
if (params.stream) return false;
const statusErr = [-1, -2].includes(_err.status);
const requestErrorRetryHandle = this.options.requestErrorRetryHandle || (() => true);
return statusErr && requestErrorRetryHandle(_err);
};
if (_errHandle(err)) return true;
return false;
}
});
})(params);
} else {
this.request = request.bind(this);
return await request.call(this, params);
}

return await this.request(params);
};

async function request(params) {
Expand Down Expand Up @@ -218,7 +216,9 @@ async function request(params) {
if (!this._setOptions || Date.now() - this._setOptions > 10000) {
this._setOptions = Date.now();
await setSTSToken.call(this);
return this.request(params);
if (!params.stream) {
return this.request(params);
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/common/client/initOptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ module.exports = function (options) {
isRequestPay: false,
sldEnable: false,
headerEncoding: 'utf-8',
refreshSTSToken: null
refreshSTSToken: null,
retryMax: 0
},
options
);
Expand Down
21 changes: 14 additions & 7 deletions lib/common/multipart.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,12 @@ proto.initMultipartUpload = async function initMultipartUpload(name, options) {
*/
proto.uploadPart = async function uploadPart(name, uploadId, partNo, file, start, end, options) {
const data = {
stream: this._createStream(file, start, end),
size: end - start
};
const isBrowserEnv = process && process.browser;
isBrowserEnv
? (data.content = await this._createBuffer(file, start, end))
: (data.stream = await this._createStream(file, start, end));
return await this._uploadPart(name, uploadId, partNo, data, options);
};

Expand All @@ -168,7 +171,9 @@ proto.uploadPart = async function uploadPart(name, uploadId, partNo, file, start
* }
*/
proto.completeMultipartUpload = async function completeMultipartUpload(name, uploadId, parts, options) {
const completeParts = parts.concat().sort((a, b) => a.number - b.number)
const completeParts = parts
.concat()
.sort((a, b) => a.number - b.number)
.filter((item, index, arr) => !index || item.number !== arr[index - 1].number);
let xml = '<?xml version="1.0" encoding="UTF-8"?>\n<CompleteMultipartUpload>\n';
for (let i = 0; i < completeParts.length; i++) {
Expand All @@ -182,7 +187,7 @@ proto.completeMultipartUpload = async function completeMultipartUpload(name, upl

options = options || {};
let opt = {};
opt = deepCopyWith(options, (_) => {
opt = deepCopyWith(options, _ => {
if (isBuffer(_)) return null;
});
if (opt.headers) delete opt.headers['x-oss-server-side-encryption'];
Expand Down Expand Up @@ -235,7 +240,8 @@ proto._uploadPart = async function _uploadPart(name, uploadId, partNo, data, opt
};
const params = this._objectRequestParams('PUT', name, opt);
params.mime = opt.mime;
params.stream = data.stream;
const isBrowserEnv = process && process.browser;
isBrowserEnv ? (params.content = data.content) : (params.stream = data.stream);
params.successStatuses = [200];

const result = await this.request(params);
Expand All @@ -245,9 +251,10 @@ proto._uploadPart = async function _uploadPart(name, uploadId, partNo, data, opt
'Please set the etag of expose-headers in OSS \n https://help.aliyun.com/document_detail/32069.html'
);
}

data.stream = null;
params.stream = null;
if (data.stream) {
data.stream = null;
params.stream = null;
}
return {
name,
etag: result.res.headers.etag,
Expand Down
Loading

0 comments on commit 443c841

Please sign in to comment.