From bfe6c45f534ce704ae28915cbddb36ddf33d81b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EF=BD=81=EF=BD=99=EF=BD=95=EF=BD=8D=EF=BD=89=C2=A0=20?= =?UTF-8?q?=EF=BD=99=EF=BD=95?= Date: Thu, 22 Jun 2017 17:43:28 +0000 Subject: [PATCH] Add requestUtil.bufferedPut to limit concurrency of put Fix #112 --- client/requestUtil.js | 13 +++++++++++-- client/sync.js | 3 ++- lib/promiseHelper.js | 38 ++++++++++++++++++++++++++++++++++++++ test/promiseHelper.js | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 lib/promiseHelper.js create mode 100644 test/promiseHelper.js diff --git a/client/requestUtil.js b/client/requestUtil.js index 0ab3ed8..e932b5f 100644 --- a/client/requestUtil.js +++ b/client/requestUtil.js @@ -3,10 +3,12 @@ const awsSdk = require('aws-sdk') const cryptoUtil = require('./cryptoUtil') const proto = require('./constants/proto') +const {limitConcurrency} = require('../lib/promiseHelper') const s3Helper = require('../lib/s3Helper') const serializer = require('../lib/serializer') const CONFIG = require('./config') +const PUT_CONCURRENCY = 100 const S3_MAX_RETRIES = 1 const EXPIRED_CREDENTIAL_ERRORS = [ /The provided token has expired\./, @@ -218,11 +220,12 @@ RequestUtil.prototype.currentRecordPrefix = function (category) { /** * Puts a single record, splitting it into multiple objects if needed. * @param {string} category - the category ID - * @param {Uint8Array} record - the object content, serialized and encrypted + * @param {object} record - the object content */ RequestUtil.prototype.put = function (category, record) { + const encryptedRecord = this.encrypt(record) const s3Prefix = this.currentRecordPrefix(category) - const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record) + const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord) return this.withRetry(() => { const fetchPromises = s3Keys.map((key, _i) => { const params = { @@ -236,6 +239,12 @@ RequestUtil.prototype.put = function (category, record) { }) } +/** + * Like put() but with limited concurrency to avoid out of memory/connection + * errors (net::ERR_INSUFFICIENT_RESOURCES) + */ +RequestUtil.prototype.bufferedPut = limitConcurrency(RequestUtil.prototype.put, PUT_CONCURRENCY) + RequestUtil.prototype.s3PostFormData = function (objectKey) { let formData = new FormData() // eslint-disable-line formData.append('key', objectKey) diff --git a/client/sync.js b/client/sync.js index efcdc2d..a2cc8a1 100644 --- a/client/sync.js +++ b/client/sync.js @@ -128,6 +128,7 @@ const startSync = (requester) => { ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords) }) ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => { + logSync(`Sending ${records.length} records`) if (!proto.categories[category]) { throw new Error(`Unsupported sync category: ${category}`) } @@ -139,7 +140,7 @@ const startSync = (requester) => { record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId) } logSync(`sending record: ${JSON.stringify(record)}`) - requester.put(proto.categories[category], requester.encrypt(record)) + requester.bufferedPut(proto.categories[category], record) }) }) ipc.on(messages.DELETE_SYNC_USER, (e) => { diff --git a/lib/promiseHelper.js b/lib/promiseHelper.js new file mode 100644 index 0000000..b1a2838 --- /dev/null +++ b/lib/promiseHelper.js @@ -0,0 +1,38 @@ +'use strict' + +/** + * Wrap a Promise-returning function so calls to it fill a queue which has + * a concurrency limit. + * e.g. there is an API rate limited to 10 concurrent connections. + * const getApi = (arg) => window.fetch(arg) + * const throttledGetApi = limitConcurrency(getApi, 10) + * for (let i; i < 1000; i++) { throttledGetApi(i) } + * @param fn {function} Function which returns a Promise + * @param concurrency {number} Maximum pending/concurrent fn calls + * @returns {function} + */ +module.exports.limitConcurrency = function (fn, concurrency) { + var queue = null + var active = [] + const next = function (_this, args) { + return function () { + const promise = fn.apply(_this, args) + active.push(promise.then(function () { + active.splice(active.indexOf(promise), 1) + })) + return [Promise.race(active), promise] + } + } + return function () { + var putActive = next(this, arguments) + if (active.length < concurrency) { + const result = putActive() + queue = result[0] + return result[1] + } else { + const result = queue.then(putActive) + queue = result.then(nextResult => nextResult[0]) + return result.then(nextResult => nextResult[1]) + } + } +} diff --git a/test/promiseHelper.js b/test/promiseHelper.js new file mode 100644 index 0000000..21b6c0c --- /dev/null +++ b/test/promiseHelper.js @@ -0,0 +1,34 @@ +const test = require('tape') +const promiseHelper = require('../lib/promiseHelper') + +test('promiseHelper', (t) => { + t.plan(1) + + t.test('limitConcurrency', (t) => { + t.plan(1) + + t.test('calls the original function the same number of times with correct args', (t) => { + t.plan(2) + const EXPECTED_CALL_COUNT = 100 + let callCount = 0 + const asyncFun = (i) => new Promise((resolve, reject) => { + setTimeout(() => { + callCount += 1 + resolve(i) + }, 1) + }) + const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3) + const promises = [] + let expectedSum = 0 + for (let i = 0; i < EXPECTED_CALL_COUNT; i++) { + promises.push(throttedAsyncFun(i)) + expectedSum += i + } + Promise.all(promises).then((results) => { + const sum = results.reduce((a, b) => a + b) + t.equal(callCount, EXPECTED_CALL_COUNT) + t.equal(sum, expectedSum) + }) + }) + }) +})