diff --git a/client/requestUtil.js b/client/requestUtil.js index 0ab3ed8..144fb13 100644 --- a/client/requestUtil.js +++ b/client/requestUtil.js @@ -3,10 +3,12 @@ const awsSdk = require('aws-sdk') const cryptoUtil = require('./cryptoUtil') const proto = require('./constants/proto') +const {limitConcurrency} = require('../lib/promiseHelper') const s3Helper = require('../lib/s3Helper') const serializer = require('../lib/serializer') const CONFIG = require('./config') +const PUT_CONCURRENCY = 1000 const S3_MAX_RETRIES = 1 const EXPIRED_CREDENTIAL_ERRORS = [ /The provided token has expired\./, @@ -218,11 +220,12 @@ RequestUtil.prototype.currentRecordPrefix = function (category) { /** * Puts a single record, splitting it into multiple objects if needed. * @param {string} category - the category ID - * @param {Uint8Array} record - the object content, serialized and encrypted + * @param {object} record - the object content */ RequestUtil.prototype.put = function (category, record) { + const encryptedRecord = this.encrypt(record) const s3Prefix = this.currentRecordPrefix(category) - const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record) + const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord) return this.withRetry(() => { const fetchPromises = s3Keys.map((key, _i) => { const params = { @@ -236,6 +239,12 @@ RequestUtil.prototype.put = function (category, record) { }) } +/** + * Like put() but with limited concurrency to avoid out of memory/connection + * errors (net::ERR_INSUFFICIENT_RESOURCES) + */ +RequestUtil.prototype.bufferedPut = limitConcurrency(RequestUtil.prototype.put, PUT_CONCURRENCY) + RequestUtil.prototype.s3PostFormData = function (objectKey) { let formData = new FormData() // eslint-disable-line formData.append('key', objectKey) diff --git a/client/sync.js b/client/sync.js index efcdc2d..8d4f615 100644 --- a/client/sync.js +++ b/client/sync.js @@ -128,9 +128,11 @@ const startSync = (requester) => { ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords) }) ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => { + logSync(`Sending ${records.length} records`) if (!proto.categories[category]) { throw new Error(`Unsupported sync category: ${category}`) } + // let puts = [] records.forEach((record) => { // Workaround #17 record.deviceId = new Uint8Array(record.deviceId) @@ -139,8 +141,11 @@ const startSync = (requester) => { record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId) } logSync(`sending record: ${JSON.stringify(record)}`) - requester.put(proto.categories[category], requester.encrypt(record)) + const bufferedPut = requester.bufferedPut(proto.categories[category], record) + // puts.push(bufferedPut) }) + // TODO: Respond with sync complete message + // Promise.all(puts) }) ipc.on(messages.DELETE_SYNC_USER, (e) => { logSync(`Deleting user!!`) diff --git a/lib/promiseHelper.js b/lib/promiseHelper.js new file mode 100644 index 0000000..6e327f1 --- /dev/null +++ b/lib/promiseHelper.js @@ -0,0 +1,42 @@ +'use strict' + +/** + * Wrap a Promise-returning function so calls to it fill a queue which has + * a concurrency limit. + * e.g. there is an API rate limited to 10 concurrent connections. + * const getApi = (arg) => window.fetch(arg) + * const throttledGetApi = limitConcurrency(getApi, 10) + * for (let i; i < 1000; i++) { throttledGetApi(i) } + * @param fn {function} Function which returns a Promise + * @param concurrency {number} Maximum pending/concurrent fn calls + * @returns {function} + */ +module.exports.limitConcurrency = function (fn, concurrency) { + var queue = null + var active = [] + const enqueueFnFactory = function (_this, args) { + return function () { + const enqueued = fn.apply(_this, args) + enqueued.then(function () { + active.splice(active.indexOf(enqueued), 1) + }) + active.push(enqueued) + return { + enqueued, + newQueue: Promise.race(active), + } + } + } + return function () { + var enqueueFn = enqueueFnFactory(this, arguments) + if (active.length < concurrency) { + const promises = enqueueFn() + queue = promises.newQueue + return promises.enqueued + } else { + const advanceQueue = queue.then(enqueueFn) + queue = advanceQueue.then(promises => promises.newQueue) + return advanceQueue.then(promises => promises.enqueued) + } + } +} diff --git a/test/promiseHelper.js b/test/promiseHelper.js new file mode 100644 index 0000000..4346287 --- /dev/null +++ b/test/promiseHelper.js @@ -0,0 +1,34 @@ +const test = require('tape') +const promiseHelper = require('../lib/promiseHelper') + +test('promiseHelper', (t) => { + t.plan(1) + + t.test('limitConcurrency', (t) => { + t.plan(1) + + t.test('calls the original function the same number of times with correct args', (t) => { + t.plan(2) + const EXPECTED_CALL_COUNT = 100 + let callCount = 0 + const asyncFun = (i) => new Promise((resolve, reject) => { + setTimeout(() => { + callCount += 1 + resolve(i) + }, Math.round(10 * Math.random())) + }) + const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3) + const promises = [] + let expectedSum = 0 + for (let i = 0; i < EXPECTED_CALL_COUNT; i++) { + promises.push(throttedAsyncFun(i)) + expectedSum += i + } + Promise.all(promises).then((results) => { + const sum = results.reduce((a, b) => a + b) + t.equal(callCount, EXPECTED_CALL_COUNT) + t.equal(sum, expectedSum) + }) + }) + }) +})