Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

Commit

Permalink
Add requestUtil.bufferedPut to limit concurrency of put
Browse files Browse the repository at this point in the history
Fix #112
  • Loading branch information
ayumi committed Jun 22, 2017
1 parent 9325e6d commit 214c175
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 3 deletions.
13 changes: 11 additions & 2 deletions client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
const awsSdk = require('aws-sdk')
const cryptoUtil = require('./cryptoUtil')
const proto = require('./constants/proto')
const {limitConcurrency} = require('../lib/promiseHelper')
const s3Helper = require('../lib/s3Helper')
const serializer = require('../lib/serializer')

const CONFIG = require('./config')
const PUT_CONCURRENCY = 1000
const S3_MAX_RETRIES = 1
const EXPIRED_CREDENTIAL_ERRORS = [
/The provided token has expired\./,
Expand Down Expand Up @@ -218,11 +220,12 @@ RequestUtil.prototype.currentRecordPrefix = function (category) {
/**
* Puts a single record, splitting it into multiple objects if needed.
* @param {string} category - the category ID
* @param {Uint8Array} record - the object content, serialized and encrypted
* @param {object} record - the object content
*/
RequestUtil.prototype.put = function (category, record) {
const encryptedRecord = this.encrypt(record)
const s3Prefix = this.currentRecordPrefix(category)
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record)
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord)
return this.withRetry(() => {
const fetchPromises = s3Keys.map((key, _i) => {
const params = {
Expand All @@ -236,6 +239,12 @@ RequestUtil.prototype.put = function (category, record) {
})
}

/**
* Like put() but with limited concurrency to avoid out of memory/connection
* errors (net::ERR_INSUFFICIENT_RESOURCES)
*/
RequestUtil.prototype.bufferedPut = limitConcurrency(RequestUtil.prototype.put, PUT_CONCURRENCY)

RequestUtil.prototype.s3PostFormData = function (objectKey) {
let formData = new FormData() // eslint-disable-line
formData.append('key', objectKey)
Expand Down
7 changes: 6 additions & 1 deletion client/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ const startSync = (requester) => {
ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords)
})
ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => {
logSync(`Sending ${records.length} records`)
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
}
// let puts = []
records.forEach((record) => {
// Workaround #17
record.deviceId = new Uint8Array(record.deviceId)
Expand All @@ -139,8 +141,11 @@ const startSync = (requester) => {
record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId)
}
logSync(`sending record: ${JSON.stringify(record)}`)
requester.put(proto.categories[category], requester.encrypt(record))
const bufferedPut = requester.bufferedPut(proto.categories[category], record)
// puts.push(bufferedPut)
})
// TODO: Respond with sync complete message
// Promise.all(puts)
})
ipc.on(messages.DELETE_SYNC_USER, (e) => {
logSync(`Deleting user!!`)
Expand Down
42 changes: 42 additions & 0 deletions lib/promiseHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

/**
* Wrap a Promise-returning function so calls to it fill a queue which has
* a concurrency limit.
* e.g. there is an API rate limited to 10 concurrent connections.
* const getApi = (arg) => window.fetch(arg)
* const throttledGetApi = limitConcurrency(getApi, 10)
* for (let i; i < 1000; i++) { throttledGetApi(i) }
* @param fn {function} Function which returns a Promise
* @param concurrency {number} Maximum pending/concurrent fn calls
* @returns {function}
*/
module.exports.limitConcurrency = function (fn, concurrency) {
var queue = null
var active = []
const enqueueFnFactory = function (_this, args) {
return function () {
const enqueued = fn.apply(_this, args)
enqueued.then(function () {
active.splice(active.indexOf(enqueued), 1)
})
active.push(enqueued)
return {
enqueued,
newQueue: Promise.race(active),
}
}
}
return function () {
var enqueueFn = enqueueFnFactory(this, arguments)
if (active.length < concurrency) {
const promises = enqueueFn()
queue = promises.newQueue
return promises.enqueued
} else {
const advanceQueue = queue.then(enqueueFn)
queue = advanceQueue.then(promises => promises.newQueue)
return advanceQueue.then(promises => promises.enqueued)
}
}
}
34 changes: 34 additions & 0 deletions test/promiseHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const test = require('tape')
const promiseHelper = require('../lib/promiseHelper')

test('promiseHelper', (t) => {
t.plan(1)

t.test('limitConcurrency', (t) => {
t.plan(1)

t.test('calls the original function the same number of times with correct args', (t) => {
t.plan(2)
const EXPECTED_CALL_COUNT = 100
let callCount = 0
const asyncFun = (i) => new Promise((resolve, reject) => {
setTimeout(() => {
callCount += 1
resolve(i)
}, Math.round(10 * Math.random()))
})
const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3)
const promises = []
let expectedSum = 0
for (let i = 0; i < EXPECTED_CALL_COUNT; i++) {
promises.push(throttedAsyncFun(i))
expectedSum += i
}
Promise.all(promises).then((results) => {
const sum = results.reduce((a, b) => a + b)
t.equal(callCount, EXPECTED_CALL_COUNT)
t.equal(sum, expectedSum)
})
})
})
})

0 comments on commit 214c175

Please sign in to comment.