forked from colinskow/pouch-mirror
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.js
57 lines (50 loc) · 1.42 KB
/
listener.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
var BPromise = require('bluebird');
var defer = require('./defer');
var timeLimit = 4900;
module.exports = function (db) {
var pending = {};
var bufferedChanges = {};
// Keep a buffer of recent changes in case the change comes in before our response
var bufferChange = function(rev) {
// console.log('Buffering change ' + rev);
bufferedChanges[rev] = new Date();
// get rid of any expired changes
for(var key in bufferedChanges) {
if((bufferedChanges[key] + timeLimit) < new Date()) {
delete bufferedChanges[key];
}
}
};
// Init Listener
db.changes({ since: 'now', live: true})
.on('change', function (change) {
change.changes.forEach(function (item) {
if(typeof pending[item.rev] !== 'undefined') {
pending[item.rev].resolve({rev: item.rev});
delete(pending[item.rev]);
} else {
bufferChange(item.rev);
}
});
});
var waitForChange = function(rev) {
var deferred = defer();
deferred.promise
.timeout(timeLimit)
.catch(BPromise.TimeoutError, function(err) {
delete(pending[rev]);
console.log(err);
return BPromise.reject(err);
});
if(bufferedChanges[rev]) {
deferred.resolve({rev: rev});
delete bufferedChanges[rev];
} else {
pending[rev] = deferred;
}
return deferred.promise;
};
return {
waitForChange: waitForChange
};
};